dbt Cloud Orchestration で Fivetran  から dbt Cloud のジョブを実行する #Fivetran

dbt Cloud Orchestration で Fivetran から dbt Cloud のジョブを実行する #Fivetran

Clock Icon2024.08.22

はじめに

Fivetran では dbt Cloud Orchestration として Fivetran 側から dbt Cloud のジョブをトリガー実行できる機能が提供されています。こちらの一通りの手順をまとめてみたく記事としました。

また、本機能については以下の記事でも紹介されていますので、こちらもあわせてご参照ください。

https://dev.classmethod.jp/articles/fivetran-2024-02-transformation-dbt-cloud/

概要

公式ドキュメントとしては以下に記載があります。

https://fivetran.com/docs/transformations/dbt-cloud

dbt Cloud でもスケジュールなどでのジョブの実行が可能ですが、この機能では以下のいずれかをトリガーに Fivetran から dbt Cloud のジョブを実行できます。

前提条件

  • dbt Cloud のプラン
    • Team または Enterprise
    • dbt Cloud Orchestration を使用するには、dbt Cloud への API アクセスが必要なため

https://www.getdbt.com/pricing

本機能の制約

  • 1つのプロジェクトは、1つの宛先にしか設定できないため、同じプロジェクトを他の宛先に設定したい場合は、最初にそのプロジェクトを現在の宛先から削除する必要があります
  • 統合スケジュールでは、ジョブをトリガーするすべてのコネクタは、同じ宛先に関連付けられている必要があります
  • コネクタの同期により、同期で新しいデータが到着したかどうかに関係なく、ジョブの実行自体はトリガーされます

事前準備

Fivetran 側

宛先には Snowflake を使用します。以下の手順で宛先として定義しておきました。

https://dev.classmethod.jp/articles/fivetran-destination-setting-snowflake/

データソースには Google スプレッドシートを使用します。ここでは ChatGPT に作ってもらった以下のサンプルデータを使用します。

  • orders テーブル
order_id customer_id order_date total_amount product_id quantity
1 101 2024-08-01 150 1 2
2 102 2024-08-03 100 2 1
3 103 2024-08-05 75 3 3
4 101 2024-08-07 300 1 4
5 104 2024-08-09 50 4 1
6 105 2024-08-10 500 2 5
7 106 2024-08-12 150 3 2
8 102 2024-08-13 230 4 3
9 103 2024-08-15 90 1 1
10 107 2024-08-18 250 2 2
  • customer テーブル
customer_id customer_name contact_email
101 Alice [email protected]
102 Bob [email protected]
103 Charlie [email protected]
104 David [email protected]
105 Eve [email protected]
106 Frank [email protected]
107 Grace [email protected]

それぞれをスプレッドシートの異なるシートに張り付け以下の手順でコネクタの設定を行います。

https://dev.classmethod.jp/articles/sync-google-spreadsheet-using-fivetran-to-data-warehouse/

接続設定後、初期同期を行います。この時点で下図のようにテーブルが作成されます。

image

dbt Cloud 側

データロードが完了したら dbt Cloud 側で検証用にサンプルプロジェクトを作成し初期化を行います。models フォルダは以下の構成としました。

models
├── marts
│   └── total_spent_per_customer.sql
├── source.yml
└── staging
    ├── stg_google_sheets__customer.sql
    └── stg_google_sheets__order.sql

各ファイルは以下のように定義しています。

source.yml
version: 2

sources:
  - name: google_sheets
    database: fivetran_database
    schema: google_sheets  
    tables:
      - name: orders
      - name: customer
stg_google_sheets__order.sql
select
	*
from
    {{ source('google_sheets', 'orders') }}
stg_google_sheets__customer.sql
select
	*
from
    {{ source('google_sheets', 'customer') }}
total_spent_per_customer.sql
WITH customer_data AS (
    SELECT 
        customer_id, 
        customer_name
    FROM 
        {{ ref('stg_google_sheets__customer') }}
),
order_data AS (
    SELECT 
        customer_id, 
        total_amount
    FROM 
        {{ ref('stg_google_sheets__order') }}
),

final AS (

    SELECT 
        customer_data.customer_id, 
        customer_data.customer_name, 
        SUM(order_data.total_amount) AS total_spent
    FROM 
        customer_data
    JOIN 
        order_data ON customer_data.customer_id = order_data.customer_id
    GROUP BY 
        customer_data.customer_id, customer_data.customer_name
    ORDER BY 
        total_spent DESC

)

SELECT
	*
FROM 
	final

リネージグラフは下図のようになります。

image 1

あわせてdbt_project.ymlmatrs フォルダのモデルはテーブルとして作成されるように設定しておきました。

dbt_project.yml の一部
models:
  my_new_project:
    marts:
      +materialized: table

開発環境のスキーマでテスト後、環境とジョブを定義します。

  • 環境の定義:prod スキーマに作成する設定

image 2

  • ジョブの設定

Fivetran 側でトリガー実行するため、ここでは dbt Cloud 側ではスケジュール等のトリガーの設定は行いませんでした。

image 3

セットアップ

事前準備ができたので、以降の手順で dbt Cloud Orchestration の設定を行います。ドキュメントとしては以下に記載があります。

https://fivetran.com/docs/transformations/dbt-cloud/setup-guide

dbt Cloud:サービス トークンの取得

Fivetran から dbt Cloud のジョブをトリガーするために、トークンを取得します。dbt Cloud にログインし右上の歯車マークから「Account settings > Service tokens」をクリックします。

2024-08-22_12h06_20

トークンの一覧画面が開くので [+ Create service token] をクリックします。

image 5

任意のトークン名を入力し「Permission set」でトークンのパーミッションを設定します。ここでは dbt Cloud のプランに応じてそれぞれ以下の通り設定します。

  • Team プラン:Member
  • Enterprise プラン:Developer

あわせてトークンでアクセス可能なプロジェクトと環境を指定します。下図ではパーミッションとして「Developer」を選択しています。

image 6

設定を保存するとトークンが表示されるので、安全な場所に控えておきます。

Fivetran:dbt Cloud Orchestration の設定

Fivetran ダッシュボードの「Transformations」をクリックし、対象となる宛先を選択します。

image 7

はじめての場合、dbt Cloud のリージョン、トークンの登録画面が表示されるので、先の手順で取得した値を入力し [Authenticate] をクリックします。するとトークン権限でアクセス可能な dbt プロジェクトを選択できるので、ドロップダウンから選択します。

image 8

設定を保存するとプロジェクトへの接続テストが実行されます。

image 9

テストが終わると下図の表示になるので [Add Transforamtion] をクリックします。

image 10

どのタイプの変換を行うかを選択します。ここでは dbt Cloud 上のプロジェクトを使用した変換を行いたいので、dbt Cloud の [Add transormations] をクリックします。

image 11

すると下図の表示となり、対象のプロジェクト内でアクセス可能なジョブが表示されます。

image 12

先の手順で [Manage project] をクリックすると、接続解除などのプロジェクト設定を行えます。

image 13

トリガー実行したいジョブを選択すると、どのようなスケジュール実行とするか選択できます。ここでは、Fivetran の同期とあわせてジョブをトリガーしたいので、「Integrated」を選択します。

image 14

なお、「Custom」を選択すると指定の頻度でスケジュール実行が可能です。

image 15

次に、対象の宛先でジョブのトリガー条件となるコネクタを指定します。複数のコネクタを指定可能です。

image 16

コネクタを指定後 [Save and Run] をクリックして設定を完了です。同期のタイミングでトリガーされるため、設定完了後すぐにジョブがトリガーされることはありません。

image 17

コネクタの同期を実行

ここでは、手動でコネクタの同期を実行してみます。

image 18

同期開始後、すぐにジョブ側もを実行状態となっていました。

image 19
実行完了後は下図の表示となります。

image 20

Snowflake 側を確認するとジョブで指定した環境で設定したスキーマにオブジェクトが作成されています。

image 21

データも確認してみます。データマートを想定したテーブルは下図のように作成されています。(この時点で7レコード)

image 22

レコードを追加

さいごにデータソース側でレコードを追加してみます。

  • orders テーブル:青枠の3レコードを追加

image 23

  • customer テーブル:青枠の2レコードを追加

image 24

レコードを追加後、同期を実行します。実行後 Snowflake 側でそれぞれテーブルを確認します。

  • orders テーブル:赤枠のレコードが追加

image 25

  • customer テーブル:赤枠のレコードが追加

image 26

dbt Cloud との連携を実施しているので、同期と合わせてジョブも実行されます。

image 27

変換処理の結果(モデル)を見ると、ジョブが実行された結果こちらも更新されていることが確認できます。赤枠のレコードがデータソースの更新とあわせて追加・更新されています。

image 28

さいごに

Fivetran の dbt Cloud Orchestration で dbt Cloud のジョブをトリガーしてみました。Fivetran のデータロード機能と dbt Cloud の変換機能それぞれの強みを活用しつつ簡単に連携設定ができる機能なので組み合わせて使っていけると便利だなと感じました。こちらの内容が何かの参考になれば幸いです。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.